{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Preprocessing Using Dataflow \n",
    "\n",
    "**Learning Objectives**\n",
    "- Creating datasets for Machine Learning using Dataflow\n",
    "\n",
    "## Introduction \n",
    "\n",
    "While Pandas is fine for experimenting, for operationalization of your workflow, it is better to do preprocessing in Apache Beam. This will also help if you need to preprocess data in flight, since Apache Beam also allows for streaming."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Ensure that we have Apache Beam version installed.\n",
    "!pip freeze | grep apache-beam || sudo pip install apache-beam[gcp]==2.12.0"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "import apache_beam as beam\n",
    "import shutil\n",
    "import os\n",
    "print(tf.__version__)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, set the environment variables related to your GCP Project."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "PROJECT = \"cloud-training-demos\"  # Replace with your PROJECT\n",
    "BUCKET = \"cloud-training-bucket\"  # Replace with your BUCKET\n",
    "REGION = \"us-central1\"            # Choose an available region for Cloud MLE\n",
    "TFVERSION = \"1.14\"                # TF version for CMLE to use"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "os.environ[\"BUCKET\"] = BUCKET\n",
    "os.environ[\"PROJECT\"] = PROJECT\n",
    "os.environ[\"REGION\"] = REGION"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "if ! gcloud storage ls | grep -q gs://${BUCKET}/; then\n",
    "    gcloud storage buckets create --location ${REGION} gs://${BUCKET}\n",
    "fi"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Save the query from earlier\n",
    "\n",
    "The data is natality data (record of births in the US). My goal is to predict the baby's weight given a number of factors about the pregnancy and the baby's mother.  Later, we will want to split the data into training and eval datasets. The hash of the year-month will be used for that."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create SQL query using natality data after the year 2000\n",
    "query_string = \"\"\"\n",
    "SELECT\n",
    "    weight_pounds,\n",
    "    is_male,\n",
    "    mother_age,\n",
    "    plurality,\n",
    "    gestation_weeks,\n",
    "    FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING))) AS hashmonth\n",
    "FROM\n",
    "    publicdata.samples.natality\n",
    "WHERE\n",
    "    year > 2000\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Call BigQuery and examine in dataframe\n",
    "from google.cloud import bigquery\n",
    "bq = bigquery.Client(project = PROJECT)\n",
    "\n",
    "df = bq.query(query_string + \"LIMIT 100\").to_dataframe()\n",
    "df.head()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create ML dataset using Dataflow\n",
    "\n",
    "Let's use Cloud Dataflow to read in the BigQuery data, do some preprocessing, and write it out as CSV files.\n",
    "\n",
    "Instead of using Beam/Dataflow, I had three other options:\n",
    "\n",
    "* Use Cloud Dataprep to visually author a Dataflow pipeline. Cloud Dataprep also allows me to explore the data, so we could have avoided much of the handcoding of Python/Seaborn calls above as well!\n",
    "* Read from BigQuery directly using TensorFlow.\n",
    "* Use the BigQuery console (http://bigquery.cloud.google.com) to run a Query and save the result as a CSV file. For larger datasets, you may have to select the option to \"allow large results\" and save the result into a CSV file on Google Cloud Storage. \n",
    "\n",
    "However, in this case, I want to do some preprocessing, modifying data so that we can simulate what is known if no ultrasound has been performed. If I didn't need preprocessing, I could have used the web console. Also, I prefer to script it out rather than run queries on the user interface, so I am using Cloud Dataflow for the preprocessing.\n",
    "\n",
    "The `preprocess` function below includes an arugment `in_test_mode`. When this is set to `True`, running `preprocess` initiates a *local* Beam job. This is helpful for quickly debugging your pipeline and ensuring it works before submitting a job to the Cloud.  Setting `in_test_mode` to `False` will launch a processing that is happening on the Cloud. Go to the GCP webconsole to [the Dataflow section](https://pantheon.corp.google.com/dataflow) and monitor the running job. It took about 20 minutes for me.\n",
    "\n",
    "If you wish to continue without doing this step, you can copy my preprocessed output:\n",
    "<pre>\n",
    "gcloud storage cp --recursive gs://cloud-training-demos/babyweight/preproc gs://YOUR_BUCKET/\n",
    "</pre>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import apache_beam as beam\n",
    "import datetime, os\n",
    "\n",
    "def to_csv(rowdict):\n",
    "    # Pull columns from BQ and create a line\n",
    "    import hashlib\n",
    "    import copy\n",
    "    CSV_COLUMNS = \"weight_pounds,is_male,mother_age,plurality,gestation_weeks\".split(',')\n",
    "\n",
    "    # Create synthetic data where we assume that no ultrasound has been performed\n",
    "    # and so we don\"t know sex of the baby. Let\"s assume that we can tell the difference\n",
    "    # between single and multiple, but that the errors rates in determining exact number\n",
    "    # is difficult in the absence of an ultrasound.\n",
    "    no_ultrasound = copy.deepcopy(rowdict)\n",
    "    w_ultrasound = copy.deepcopy(rowdict)\n",
    "\n",
    "    no_ultrasound[\"is_male\"] = \"Unknown\"\n",
    "    if rowdict[\"plurality\"] > 1:\n",
    "        no_ultrasound[\"plurality\"] = \"Multiple(2+)\"\n",
    "    else:\n",
    "        no_ultrasound[\"plurality\"] = \"Single(1)\"\n",
    "\n",
    "    # Change the plurality column to strings\n",
    "    w_ultrasound[\"plurality\"] = [\"Single(1)\", \"Twins(2)\", \"Triplets(3)\", \"Quadruplets(4)\", \"Quintuplets(5)\"][rowdict[\"plurality\"] - 1]\n",
    "\n",
    "    # Write out two rows for each input row, one with ultrasound and one without\n",
    "    for result in [no_ultrasound, w_ultrasound]:\n",
    "        data = ','.join([str(result[k]) if k in result else \"None\" for k in CSV_COLUMNS])\n",
    "        yield str(\"{}\".format(data))\n",
    "  \n",
    "def preprocess(in_test_mode):\n",
    "    import shutil, os, subprocess\n",
    "    job_name = \"preprocess-babyweight-features\" + \"-\" + datetime.datetime.now().strftime(\"%y%m%d-%H%M%S\")\n",
    "\n",
    "    if in_test_mode:\n",
    "        print(\"Launching local job ... hang on\")\n",
    "        OUTPUT_DIR = \"./preproc\"\n",
    "        shutil.rmtree(OUTPUT_DIR, ignore_errors=True)\n",
    "        os.makedirs(OUTPUT_DIR)\n",
    "    else:\n",
    "        print(\"Launching Dataflow job {} ... hang on\".format(job_name))\n",
    "        OUTPUT_DIR = \"gs://{0}/babyweight/preproc/\".format(BUCKET)\n",
    "        try:\n",
    "            subprocess.check_call(\"gcloud storage rm --recursive {}\".format(OUTPUT_DIR).split())\n",
    "        except:\n",
    "            pass\n",
    "\n",
    "    options = {\n",
    "        \"staging_location\": os.path.join(OUTPUT_DIR, \"tmp\", \"staging\"),\n",
    "        \"temp_location\": os.path.join(OUTPUT_DIR, \"tmp\"),\n",
    "        \"job_name\": job_name,\n",
    "        \"project\": PROJECT,\n",
    "        \"teardown_policy\": \"TEARDOWN_ALWAYS\",\n",
    "        \"no_save_main_session\": True\n",
    "    }\n",
    "    opts = beam.pipeline.PipelineOptions(flags = [], **options)\n",
    "    if in_test_mode:\n",
    "        RUNNER = \"DirectRunner\"\n",
    "    else:\n",
    "        RUNNER = \"DataflowRunner\"\n",
    "        \n",
    "    p = beam.Pipeline(RUNNER, options = opts)\n",
    "    query = \"\"\"\n",
    "SELECT\n",
    "    weight_pounds,\n",
    "    is_male,\n",
    "    mother_age,\n",
    "    plurality,\n",
    "    gestation_weeks,\n",
    "    FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING))) AS hashmonth\n",
    "FROM\n",
    "    publicdata.samples.natality\n",
    "WHERE\n",
    "    year > 2000\n",
    "    AND weight_pounds > 0\n",
    "    AND mother_age > 0\n",
    "    AND plurality > 0\n",
    "    AND gestation_weeks > 0\n",
    "    AND month > 0\n",
    "\"\"\"\n",
    "\n",
    "    if in_test_mode:\n",
    "        query = query + \" LIMIT 100\" \n",
    "\n",
    "    for step in [\"train\", \"eval\"]:\n",
    "        if step == \"train\":\n",
    "            selquery = \"SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 100)) < 80\".format(query)\n",
    "        elif step == \"eval\":\n",
    "            selquery = \"SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 100)) >= 80 AND ABS(MOD(hashmonth, 100)) < 90\".format(query)\n",
    "        else: \n",
    "            selquery = \"SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 100)) >= 90\".format(query)\n",
    "        (p \n",
    "         | \"{}_read\".format(step) >> beam.io.Read(beam.io.BigQuerySource(query = selquery, use_standard_sql = True))\n",
    "         | \"{}_csv\".format(step) >> beam.FlatMap(to_csv)\n",
    "         | \"{}_out\".format(step) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, \"{}.csv\".format(step))))\n",
    "        )\n",
    "\n",
    "    job = p.run()\n",
    "    if in_test_mode:\n",
    "        job.wait_until_finish()\n",
    "        print(\"Done!\")\n",
    "    \n",
    "preprocess(in_test_mode = False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For a Cloud preprocessing job (i.e. setting `in_test_mode` to `False`), the above step will take 20+ minutes. Go to the GCP web console, navigate to the Dataflow section and <b>wait for the job to finish</b> before you run the follwing step.\n",
    "\n",
    "## View results\n",
    "We can have a look at the elements in our bucket to see the results of our pipeline above."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gcloud storage ls gs://$BUCKET/babyweight/preproc/*-00000*"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Preprocessing with BigQuery"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create SQL query for BigQuery that will union all both the ultrasound and no ultrasound datasets."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "query = \"\"\"\n",
    "WITH CTE_Raw_Data AS (\n",
    "SELECT\n",
    "  weight_pounds,\n",
    "  CAST(is_male AS STRING) AS is_male,\n",
    "  mother_age,\n",
    "  plurality,\n",
    "  gestation_weeks,\n",
    "  FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING))) AS hashmonth\n",
    "FROM\n",
    "  publicdata.samples.natality\n",
    "WHERE\n",
    "  year > 2000\n",
    "  AND weight_pounds > 0\n",
    "  AND mother_age > 0\n",
    "  AND plurality > 0\n",
    "  AND gestation_weeks > 0\n",
    "  AND month > 0)\n",
    "\n",
    "-- Ultrasound\n",
    "SELECT\n",
    "  weight_pounds,\n",
    "  is_male,\n",
    "  mother_age,\n",
    "  CASE\n",
    "    WHEN plurality = 1 THEN \"Single(1)\"\n",
    "    WHEN plurality = 2 THEN \"Twins(2)\"\n",
    "    WHEN plurality = 3 THEN \"Triplets(3)\"\n",
    "    WHEN plurality = 4 THEN \"Quadruplets(4)\"\n",
    "    WHEN plurality = 5 THEN \"Quintuplets(5)\"\n",
    "    ELSE \"NULL\"\n",
    "  END AS plurality,\n",
    "  gestation_weeks,\n",
    "  hashmonth\n",
    "FROM\n",
    "  CTE_Raw_Data\n",
    "UNION ALL\n",
    "-- No ultrasound\n",
    "SELECT\n",
    "  weight_pounds,\n",
    "  \"Unknown\" AS is_male,\n",
    "  mother_age,\n",
    "  CASE\n",
    "    WHEN plurality = 1 THEN \"Single(1)\"\n",
    "    WHEN plurality > 1 THEN \"Multiple(2+)\"\n",
    "  END AS plurality,\n",
    "  gestation_weeks,\n",
    "  hashmonth\n",
    "FROM\n",
    "    CTE_Raw_Data\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create temporary BigQuery dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from google.cloud import bigquery\n",
    "\n",
    "# Construct a BigQuery client object.\n",
    "client = bigquery.Client()\n",
    "\n",
    "# Set dataset_id to the ID of the dataset to create.\n",
    "dataset_name = \"temp_babyweight_dataset\"\n",
    "dataset_id = \"{}.{}\".format(client.project, dataset_name)\n",
    "\n",
    "# Construct a full Dataset object to send to the API.\n",
    "dataset = bigquery.Dataset.from_string(dataset_id)\n",
    "\n",
    "# Specify the geographic location where the dataset should reside.\n",
    "dataset.location = \"US\"\n",
    "\n",
    "# Send the dataset to the API for creation.\n",
    "# Raises google.api_core.exceptions.Conflict if the Dataset already\n",
    "# exists within the project.\n",
    "try:\n",
    "    dataset = client.create_dataset(dataset)  # API request\n",
    "    print(\"Created dataset {}.{}\".format(client.project, dataset.dataset_id))\n",
    "except:\n",
    "    print(\"Dataset {}.{} already exists\".format(client.project, dataset.dataset_id))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Execute query and write to BigQuery table."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "job_config = bigquery.QueryJobConfig()\n",
    "for step in [\"train\", \"eval\"]:\n",
    "    if step == \"train\":\n",
    "        selquery = \"SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 100)) < 80\".format(query)\n",
    "    elif step == \"eval\":\n",
    "        selquery = \"SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 100)) >= 80 AND ABS(MOD(hashmonth, 100)) < 90\".format(query)\n",
    "    else: \n",
    "        selquery = \"SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 100)) >= 90\".format(query)\n",
    "    # Set the destination table\n",
    "    table_name = \"babyweight_{}\".format(step)\n",
    "    table_ref = client.dataset(dataset_name).table(table_name)\n",
    "    job_config.destination = table_ref\n",
    "    job_config.write_disposition = \"WRITE_TRUNCATE\"\n",
    "\n",
    "    # Start the query, passing in the extra configuration.\n",
    "    query_job = client.query(\n",
    "        query=selquery,\n",
    "        # Location must match that of the dataset(s) referenced in the query\n",
    "        # and of the destination table.\n",
    "        location=\"US\",\n",
    "        job_config=job_config)  # API request - starts the query\n",
    "\n",
    "    query_job.result()  # Waits for the query to finish\n",
    "    print(\"Query results loaded to table {}\".format(table_ref.path))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Export BigQuery table to CSV in GCS."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dataset_ref = client.dataset(dataset_id=dataset_name, project=PROJECT)\n",
    "\n",
    "for step in [\"train\", \"eval\"]:\n",
    "    destination_uri = \"gs://{}/{}\".format(BUCKET, \"babyweight/bq_data/{}*.csv\".format(step))\n",
    "    table_name = \"babyweight_{}\".format(step)\n",
    "    table_ref = dataset_ref.table(table_name)\n",
    "    extract_job = client.extract_table(\n",
    "        table_ref,\n",
    "        destination_uri,\n",
    "        # Location must match that of the source table.\n",
    "        location=\"US\",\n",
    "    )  # API request\n",
    "    extract_job.result()  # Waits for job to complete.\n",
    "\n",
    "    print(\"Exported {}:{}.{} to {}\".format(PROJECT, dataset_name, table_name, destination_uri))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## View results\n",
    "We can have a look at the elements in our bucket to see the results of our pipeline above."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gcloud storage ls gs://$BUCKET/babyweight/bq_data/*000000000000*"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Copyright 2017 Google Inc. Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
